package cn.wellt.mqtt;

import cn.wellt.mqtt.callback.MqttMessageListener;
import cn.wellt.mqtt.config.MqttMessageListenerHolder;
import cn.wellt.mqtt.properties.MqttConfigProperties;
import cn.wellt.mqtt.util.SpringContextHolder;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.StringUtils;

import java.time.Instant;
import java.util.Map;
import java.util.Objects;

/**
 * mqtt配置类
 *
 * @author caojingchen
 * @date 2021/02/01 11:24
 */
@Configuration
@IntegrationComponentScan
@ComponentScan(basePackages = "cn.wellt.mqtt.util")
@ConditionalOnProperty(value = "mqtt.host")
@EnableConfigurationProperties(MqttConfigProperties.class)
public class MqttAutoConfiguration {
    private final MqttConfigProperties configProperties;

    public MqttAutoConfiguration(MqttConfigProperties configProperties) {
        this.configProperties = configProperties;
    }

    /**
     * mqtt 连接配置
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(configProperties.getTimeout());
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        if (StringUtils.hasText(configProperties.getUsername())) {
            mqttConnectOptions.setUserName(configProperties.getUsername());
        }
        if (StringUtils.hasText(configProperties.getPassword())) {
            mqttConnectOptions.setPassword(configProperties.getPassword().toCharArray());
        }
        mqttConnectOptions.setServerURIs(configProperties.getHost().split(","));
        return mqttConnectOptions;
    }

    /**
     * mqtt连接工厂
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /**
     * 入通道
     */
    @Bean
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * 入消息适配器
     */
    @Bean
    public MessageProducer mqttInboundAdaptor() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(configProperties.getInboundClientPrefix() + Instant.now().getEpochSecond(), mqttClientFactory());
        adapter.setCompletionTimeout(configProperties.getCompletionTimeout());
        adapter.setOutputChannel(mqttInboundChannel());
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        Map<String, String> topicMap = MqttMessageListenerHolder.getListeners();
        Map<String, Integer> qosMap = MqttMessageListenerHolder.getQoss();

        for (String topic : topicMap.keySet()) {
            Integer qos = qosMap.get(topic);
            adapter.addTopic(topic, qos);
        }
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler mqttInboundMessageHandler() {
        return message -> {
            MessageHeaders headers = message.getHeaders();
            String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
            String listenerName = MqttMessageListenerHolder.getListener(topic);
            if (!StringUtils.hasText(listenerName)) {
                // 如何支持模糊查询
                return;
            }
            MqttMessageListener listener = null;
            try {
                listener = (MqttMessageListener) SpringContextHolder.getBean(Class.forName(listenerName));
                MqttMessage mqttMessage = new MqttMessage((byte[]) message.getPayload());
                mqttMessage.setId(Integer.parseInt(Objects.requireNonNull(headers.get(MqttHeaders.ID)).toString()));
                mqttMessage.setQos(Integer.parseInt(Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_QOS)).toString()));
                if (null != headers.get(MqttHeaders.RECEIVED_RETAINED)) {
                    mqttMessage.setRetained(Boolean.parseBoolean(Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_RETAINED)).toString()));
                }
                listener.onMessage(topic, mqttMessage);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        };
    }

    /**
     * 出通道
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * 出消息适配器
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(configProperties.getOutboundClientPrefix() + Instant.now().getEpochSecond(), mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(configProperties.getDefaultTopic());
        return messageHandler;
    }


}
